You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vi...@apache.org on 2018/11/15 23:51:36 UTC

[drill] 07/07: DRILL-6833: Support for pushdown of rowkey based joins

This is an automated email from the ASF dual-hosted git repository.

vitalii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit f6c63bf5dbc7bcd14b202249d013cd974a96a68a
Author: Gautam Parai <gp...@maprtech.com>
AuthorDate: Wed Jan 3 11:06:41 2018 -0800

    DRILL-6833: Support for pushdown of rowkey based joins
    
    closes #1532
---
 .../mapr/db/json/RestrictedJsonRecordReader.java   |  20 +-
 .../drill/maprdb/tests/index/IndexPlanTest.java    | 273 ++++++++++-
 .../apache/drill/exec/planner/PlannerPhase.java    |  16 +
 .../drill/exec/planner/logical/DrillJoinRel.java   |  51 +-
 .../logical/DrillPushRowKeyJoinToScanRule.java     | 544 +++++++++++++++++++++
 .../planner/logical/RowKeyJoinCallContext.java     |  93 ++++
 .../drill/exec/planner/logical/RowKeyJoinRel.java  |  69 +++
 .../drill/exec/planner/physical/JoinPruleBase.java |  54 ++
 .../exec/planner/physical/PlannerSettings.java     |  13 +
 .../exec/planner/physical/RowKeyJoinPrule.java     |  62 +++
 .../planner/sql/handlers/DefaultSqlHandler.java    |  10 +-
 .../exec/server/options/SystemOptionManager.java   |   3 +
 .../java-exec/src/main/resources/drill-module.conf |   3 +
 13 files changed, 1176 insertions(+), 35 deletions(-)

diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
index 1eb4131..bf150c1 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.mapr.db.json;
 
+import com.mapr.db.Table;
 import static org.apache.drill.exec.store.mapr.PluginErrorHandler.dataReadError;
 
 import java.nio.ByteBuffer;
@@ -55,24 +56,6 @@ public class RestrictedJsonRecordReader extends MaprDBJsonRecordReader {
   private int batchSize; // batchSize for rowKey based document get
 
   private String [] projections = null; // multiGet projections
-  public RestrictedJsonRecordReader(MapRDBSubScanSpec subScanSpec,
-                                    MapRDBFormatPlugin formatPlugin,
-                                    List<SchemaPath> projectedColumns, FragmentContext context) {
-
-    super(subScanSpec, formatPlugin, projectedColumns, context);
-    batchSize = (int)context.getOptions().getOption(ExecConstants.QUERY_ROWKEYJOIN_BATCHSIZE);
-    int idx = 0;
-    FieldPath[] scannedFields = this.getScannedFields();
-
-    // only populate projections for non-star query (for star, null is interpreted as all fields)
-    if (!this.isStarQuery() && scannedFields != null && scannedFields.length > 0) {
-      projections = new String[scannedFields.length];
-      for (FieldPath path : scannedFields) {
-        projections[idx] = path.asPathString();
-        ++idx;
-      }
-    }
-  }
 
   public RestrictedJsonRecordReader(MapRDBSubScanSpec subScanSpec,
                                     MapRDBFormatPlugin formatPlugin,
@@ -155,6 +138,7 @@ public class RestrictedJsonRecordReader extends MaprDBJsonRecordReader {
       return 0;
     }
 
+    Table table = super.formatPlugin.getJsonTableCache().getTable(subScanSpec.getTableName(), subScanSpec.getUserName());
     final MultiGet multiGet = new MultiGet((BaseJsonTable) table, condition, false, projections);
     int recordCount = 0;
     DBDocumentReaderBase reader = null;
diff --git a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
index a9de9e3..6754220 100644
--- a/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
+++ b/contrib/format-maprdb/src/test/java/com/mapr/drill/maprdb/tests/index/IndexPlanTest.java
@@ -48,6 +48,7 @@ public class IndexPlanTest extends BaseJsonTest {
   private static final String defaultHavingIndexPlan = "alter session reset `planner.enable_index_planning`";
   private static final String disableHashAgg = "alter session set `planner.enable_hashagg` = false";
   private static final String enableHashAgg =  "alter session set `planner.enable_hashagg` = true";
+  private static final String lowNonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.00001";
   private static final String defaultnonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.025";
   private static final String incrnonCoveringSelectivityThreshold = "alter session set `planner.index.noncovering_selectivity_threshold` = 0.25";
   private static final String disableFTS = "alter session set `planner.disable_full_table_scan` = true";
@@ -58,7 +59,10 @@ public class IndexPlanTest extends BaseJsonTest {
       = "alter session set `planner.index.rowkeyjoin_cost_factor` = 0.01";
   private static final String defaultRowKeyJoinBackIOFactor
       = "alter session reset `planner.index.rowkeyjoin_cost_factor`";
-
+  private static final String incrRowKeyJoinConvSelThreshold = "alter session set `planner.rowkeyjoin_conversion_selectivity_threshold` = 1.0";
+  private static final String defaultRowKeyConvSelThreshold = "alter session reset `planner.rowkeyjoin_conversion_selectivity_threshold`";
+  private static final String forceRowKeyJoinConversionUsingHashJoin = "alter session set `planner.rowkeyjoin_conversion_using_hashjoin` = true";
+  private static final String defaultRowKeyJoinConversionUsingHashJoin = "alter session reset `planner.rowkeyjoin_conversion_using_hashjoin`";
   /**
    *  A sample row of this 10K table:
    ------------------+-----------------------------+--------+
@@ -110,6 +114,7 @@ public class IndexPlanTest extends BaseJsonTest {
             "i_state_age_phone", "address.state,personal.age,contact.phone", "name.fname",
             "i_cast_age_income_phone", "$CAST(personal.age@INT),$CAST(personal.income@INT),contact.phone", "name.fname",
             "i_age_with_fname", "personal.age", "name.fname",
+            "i_rowid_cast_date_birthdate", "rowid", "$CAST(personal.birthdate@DATE)",
             "hash_i_reverseid", "reverseid", "",
             "hash_i_cast_timestamp_firstlogin", "$CAST(activity.irs.firstlogin@TIMESTAMP)", "id.ssn"
         };
@@ -1687,4 +1692,270 @@ public class IndexPlanTest extends BaseJsonTest {
       test(sliceTargetDefault);
     }
   }
+
+  @Test
+  public void testRowkeyJoinPushdown_1() throws Exception {
+    // _id IN (select col ...)
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in (select t2._id " +
+        " from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
+        " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_2() throws Exception {
+    // _id = col
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
+        " where t1._id = t2._id and cast(t2.activity.irs.firstlogin as timestamp) = " +
+        " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_3() throws Exception {
+    // filters on both sides of the join
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
+        " where t1._id = t2._id and cast(t2.activity.irs.firstlogin as timestamp) = " +
+        " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S') and cast(t1.activity.irs.firstlogin as timestamp) = " +
+        " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S') ";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_4() throws Exception {
+    // _id = cast(col as int) works since the rowids are internally cast to string!
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
+        " where t1._id = cast(t2.rowid as int) and cast(t2.activity.irs.firstlogin as timestamp) = " +
+        " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_5() throws Exception {
+    // _id = cast(cast(col as int) as varchar(10)
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
+        " where t1._id = cast(cast(t2.rowid as int) as varchar(10)) " +
+        " and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_6() throws Exception {
+    // _id IN (select cast(cast(col as int) as varchar(10) ... JOIN ...)
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in " +
+        "(select cast(cast(t2.rowid as int) as varchar(10)) from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 " +
+        "where t2.address.city = t3.address.city and cast(t2.activity.irs.firstlogin as timestamp) =  " +
+        "to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_7() throws Exception {
+    // with non-covering index
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
+        "where t1._id = t2.rowid and cast(t2.activity.irs.firstlogin as timestamp) = " +
+        "to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + incrnonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[] {"RowKeyJoin", "RestrictedJsonTableGroupScan", "RowKeyJoin", "indexName=hash_i_cast_timestamp_firstlogin"},
+          new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_8() throws Exception {
+    // with covering index
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
+        "where t1._id = t2.rowid and t2.rowid = '1012'";
+    try {
+      test(incrRowKeyJoinConvSelThreshold);
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[] {"RowKeyJoin", "indexName=i_rowid_cast_date_birthdate"},
+          new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold);
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_9() throws Exception {
+    // Negative test - rowkey join should not be present
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where cast(_id as varchar(10)) in " +
+        "(select t2._id from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
+        " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_10() throws Exception {
+    // Negative test - rowkey join should not be present
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t2 " +
+        " where cast(t1._id as varchar(10)) = cast(t2._id as varchar(10)) and cast(t2.activity.irs.firstlogin as timestamp) = " +
+        " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_11() throws Exception {
+    // Negative test - rowkey join should not be present
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where cast(_id as varchar(10)) in " +
+        "(select t2._id from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 where t2.address.city = t3.address.city " +
+        "and cast(t2.activity.irs.firstlogin as timestamp) =  to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {}, new String[] {"RowKeyJoin"});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_12() throws Exception {
+    // JOIN _id IN (select cast(cast(col as int) as varchar(10) ... JOIN ...) - rowkey join appears in intermediate join order
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1, hbase.`index_test_primary` t4 " +
+        "where t1.address.city = t4.address.city and t1._id in (select cast(cast(t2.rowid as int) as varchar(10)) " +
+        "from hbase.`index_test_primary` t2, hbase.`index_test_primary` t3 where t2.address.city = t3.address.city " +
+        "and cast(t2.activity.irs.firstlogin as timestamp) = to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S')) " +
+        "and t4.address.state = 'pc'";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[] {"HashJoin(.*[\n\r])+.*Scan.*indexName=i_state_city_dl(.*[\n\r])+.*RowKeyJoin(.*[\n\r])+.*RestrictedJsonTableGroupScan(.*[\n\r])+.*HashAgg\\(group=\\[\\{0\\}\\]\\)(.*[\n\r])+.*HashJoin"},
+          new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";");
+    }
+  }
+
+  @Test
+  public void testRowkeyJoinPushdown_13() throws Exception {
+    // Check option planner.rowkeyjoin_conversion_using_hashjoin works as expected!
+    String query = "select t1.id.ssn as ssn from hbase.`index_test_primary` t1 where _id in (select t2._id " +
+        " from hbase.`index_test_primary` t2 where cast(t2.activity.irs.firstlogin as timestamp) = " +
+        " to_timestamp('2013-02-04 22:34:38.0', 'YYYY-MM-dd HH:mm:ss.S'))";
+    try {
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {"RowKeyJoin"}, new String[] {});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+      test(incrRowKeyJoinConvSelThreshold + ";" + lowNonCoveringSelectivityThreshold + ";" +
+          forceRowKeyJoinConversionUsingHashJoin + ";");
+      PlanTestBase.testPlanMatchingPatterns(query, new String[] {"HashJoin"}, new String[] {"RowKeyJoin"});
+      testBuilder()
+          .sqlQuery(query)
+          .ordered()
+          .baselineColumns("ssn").baselineValues("100007423")
+          .go();
+    } finally {
+      test(defaultRowKeyConvSelThreshold + ";" + defaultnonCoveringSelectivityThreshold + ";" +
+          defaultRowKeyJoinConversionUsingHashJoin);
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 17f8da5..91d9d43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule;
 import org.apache.drill.exec.planner.logical.DrillPushProjectIntoScanRule;
 import org.apache.drill.exec.planner.logical.DrillPushProjectPastFilterRule;
 import org.apache.drill.exec.planner.logical.DrillPushProjectPastJoinRule;
+import org.apache.drill.exec.planner.logical.DrillPushRowKeyJoinToScanRule;
 import org.apache.drill.exec.planner.logical.DrillReduceAggregatesRule;
 import org.apache.drill.exec.planner.logical.DrillReduceExpressionsRule;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
@@ -71,6 +72,7 @@ import org.apache.drill.exec.planner.physical.NestedLoopJoinPrule;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.ProjectPrule;
 import org.apache.drill.exec.planner.physical.PushLimitToTopN;
+import org.apache.drill.exec.planner.physical.RowKeyJoinPrule;
 import org.apache.drill.exec.planner.physical.ScanPrule;
 import org.apache.drill.exec.planner.physical.ScreenPrule;
 import org.apache.drill.exec.planner.physical.SortConvertPrule;
@@ -152,6 +154,19 @@ public enum PlannerPhase {
     }
   },
 
+  ROWKEYJOIN_CONVERSION("Convert Join to RowKeyJoin") {
+    public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
+      List<RelOptRule> rules = Lists.newArrayList();
+      if (context.getPlannerSettings().isRowKeyJoinConversionEnabled()) {
+        rules.add(DrillPushRowKeyJoinToScanRule.JOIN);
+      }
+      return PlannerPhase.mergedRuleSets(
+          RuleSets.ofList(rules),
+          getStorageRules(context, plugins, this)
+      );
+    }
+  },
+
   SUM_CONVERSION("Convert SUM to $SUM0") {
     public RuleSet getRules(OptimizerRulesContext context, Collection<StoragePlugin> plugins) {
       return PlannerPhase.mergedRuleSets(
@@ -494,6 +509,7 @@ public enum PlannerPhase {
     ruleList.add(UnionAllPrule.INSTANCE);
     ruleList.add(ValuesPrule.INSTANCE);
     ruleList.add(DirectScanPrule.INSTANCE);
+    ruleList.add(RowKeyJoinPrule.INSTANCE);
 
     ruleList.add(UnnestPrule.INSTANCE);
     ruleList.add(LateralJoinPrule.INSTANCE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index 0126e74..2559d28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -37,6 +37,7 @@ import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.LogicalOperator;
 import org.apache.drill.common.logical.data.Project;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.planner.common.JoinControl;
 import org.apache.drill.exec.planner.torel.ConversionContext;
 
 /**
@@ -49,11 +50,21 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
    * We do not throw InvalidRelException in Logical planning phase. It's up to the post-logical planning check or physical planning
    * to detect the unsupported join type, and throw exception.
    * */
+  private int joinControl = JoinControl.DEFAULT;
+
+  public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+      JoinRelType joinType) {
+    super(cluster, traits, left, right, condition, joinType);
+    assert traits.contains(DrillRel.DRILL_LOGICAL);
+    RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys, filterNulls);
+  }
+
   public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
-      JoinRelType joinType)  {
+      JoinRelType joinType, int joinControl)  {
     super(cluster, traits, left, right, condition, joinType);
     assert traits.contains(DrillRel.DRILL_LOGICAL);
     RelOptUtil.splitJoinCondition(left, right, condition, leftKeys, rightKeys, filterNulls);
+    this.joinControl = joinControl;
   }
 
   public DrillJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
@@ -66,7 +77,6 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
     this.rightKeys = rightKeys;
   }
 
-
   @Override
   public DrillJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
     return new DrillJoinRel(getCluster(), traitSet, left, right, condition, joinType);
@@ -144,28 +154,39 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
     return builder.build();
   }
 
-  public static DrillJoinRel convert(Join join, ConversionContext context) throws InvalidRelException{
+  protected static Pair<RelNode, RelNode> getJoinInputs(Join join, ConversionContext context) throws InvalidRelException {
     RelNode left = context.toRel(join.getLeft());
     RelNode right = context.toRel(join.getRight());
+    return Pair.of(left, right);
+  }
 
+  protected static RexNode getJoinCondition(Join join, ConversionContext context) throws InvalidRelException {
+    Pair<RelNode, RelNode> inputs = getJoinInputs(join, context);
     List<RexNode> joinConditions = new ArrayList<RexNode>();
     // right fields appear after the LHS fields.
-    final int rightInputOffset = left.getRowType().getFieldCount();
+    final int rightInputOffset = inputs.left.getRowType().getFieldCount();
     for (JoinCondition condition : join.getConditions()) {
-      RelDataTypeField leftField = left.getRowType().getField(ExprHelper.getFieldName(condition.getLeft()), true, false);
-      RelDataTypeField rightField = right.getRowType().getField(ExprHelper.getFieldName(condition.getRight()), true, false);
-        joinConditions.add(
-            context.getRexBuilder().makeCall(
-                SqlStdOperatorTable.EQUALS,
-                context.getRexBuilder().makeInputRef(leftField.getType(), leftField.getIndex()),
-                context.getRexBuilder().makeInputRef(rightField.getType(), rightInputOffset + rightField.getIndex())
-                )
-                );
+      RelDataTypeField leftField = inputs.left.getRowType().getField(ExprHelper.getFieldName(condition.getLeft()),
+          true, false);
+      RelDataTypeField rightField = inputs.right.getRowType().getField(ExprHelper.getFieldName(condition.getRight()),
+          true, false);
+      joinConditions.add(
+          context.getRexBuilder().makeCall(
+              SqlStdOperatorTable.EQUALS,
+              context.getRexBuilder().makeInputRef(leftField.getType(), leftField.getIndex()),
+              context.getRexBuilder().makeInputRef(rightField.getType(), rightInputOffset + rightField.getIndex())
+          )
+      );
     }
     RexNode rexCondition = RexUtil.composeConjunction(context.getRexBuilder(), joinConditions, false);
-    DrillJoinRel joinRel = new DrillJoinRel(context.getCluster(), context.getLogicalTraits(), left, right, rexCondition, join.getJoinType());
+    return rexCondition;
+  }
 
+  public static DrillJoinRel convert(Join join, ConversionContext context) throws InvalidRelException{
+    Pair<RelNode, RelNode> inputs = getJoinInputs(join, context);
+    RexNode rexCondition = getJoinCondition(join, context);
+    DrillJoinRel joinRel = new DrillJoinRel(context.getCluster(), context.getLogicalTraits(),
+        inputs.left, inputs.right, rexCondition, join.getJoinType());
     return joinRel;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java
new file mode 100644
index 0000000..7c0a9b7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushRowKeyJoinToScanRule.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.planner.logical.RowKeyJoinCallContext.RowKey;
+import org.apache.drill.exec.physical.base.DbGroupScan;
+import org.apache.drill.exec.planner.index.rules.MatchFunction;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This rule implements the run-time filter pushdown via the rowkey join for queries with row-key filters. Row-key
+ * filters are filters on primary-keys which appears in database groupscans {@link DbGroupScan}.
+ *
+ * Consider the following query:
+ * SELECT L.LINEITEM_ID FROM LINEITEM L WHERE L._ID IN (SELECT O.LID FROM ORDERS O WHERE O.ORDER_DATE > '2019-01-01');
+ * With this rule the logical plan on the left would transform to the logical plan on the right:
+ * Project                                                Project
+ *   Join (L._ID = O.LID)                                   RowKeyJoin (L._ID = O.LID)
+ *     LineItem L                                ====>>       Lineitem L
+ *     Filter (ORDER_DATE > '2019-01-01')                     Filter (ORDER_DATE > '2019-01-01')
+ *       Orders O                                               Orders O
+ *
+ * During physical planning, the plan on the left would end up with e.g. HashJoin whereas the transformed plan would
+ * have a RowKeyJoin along with a Restricted GroupScan instead.
+ * Project                                                Project
+ *   HashJoin (L._ID = O.LID)                               RowKeyJoin (L._ID = O.LID)
+ *     Scan (LineItem L)                                      RestrictedScan (Lineitem L)
+ *     Filter (ORDER_DATE > '2019-01-01')                     Filter (ORDER_DATE > '2019-01-01')
+ *       Scan (Orders O)                                        Scan (Orders O)
+ *
+ * The row-key join pushes the `row-keys` for rows satisfying the filter into the Lineitem restricted groupscan. So
+ * we only fetch these rowkeys instead of fetching all rows into the Hash Join.
+ */
+public class DrillPushRowKeyJoinToScanRule extends RelOptRule {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillPushRowKeyJoinToScanRule.class);
+  final public MatchFunction match;
+
+  private DrillPushRowKeyJoinToScanRule(RelOptRuleOperand operand, String description, MatchFunction match) {
+    super(operand, description);
+    this.match = match;
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    return match.match(call);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    doOnMatch((RowKeyJoinCallContext) match.onMatch(call));
+  }
+
+  public static DrillPushRowKeyJoinToScanRule JOIN = new DrillPushRowKeyJoinToScanRule(
+      RelOptHelper.any(DrillJoinRel.class), "DrillPushRowKeyJoinToScanRule_Join", new MatchRelJ());
+
+  public static class MatchRelJ implements MatchFunction<RowKeyJoinCallContext> {
+    /*
+     * Returns the rels matching the specified sequence relSequence. The match is executed
+     * beginning from startingRel. An example of such a sequence is Join->Filter->Project->Scan
+     */
+    private List<RelNode> findRelSequence(Class[] relSequence, RelNode startingRel) {
+      List<RelNode> matchingRels = new ArrayList<>();
+      findRelSequenceInternal(relSequence, 0, startingRel, matchingRels);
+      return matchingRels;
+    }
+    /*
+     * Recursively match until the sequence is satisfied. Otherwise return. Recurse down intermediate nodes
+     * such as RelSubset/HepRelVertex.
+     */
+    private void findRelSequenceInternal(Class[] classes, int idx, RelNode rel, List<RelNode> matchingRels) {
+      if (rel instanceof HepRelVertex) {
+        findRelSequenceInternal(classes, idx, ((HepRelVertex) rel).getCurrentRel(), matchingRels);
+      } else if (rel instanceof RelSubset) {
+        if (((RelSubset) rel).getBest() != null) {
+          findRelSequenceInternal(classes, idx, ((RelSubset) rel).getBest(), matchingRels);
+        } else {
+          findRelSequenceInternal(classes, idx, ((RelSubset) rel).getOriginal(), matchingRels);
+        }
+      } else if (classes[idx].isInstance(rel)) {
+        matchingRels.add(rel);
+        if (idx + 1 < classes.length && rel.getInputs().size() > 0) {
+          findRelSequenceInternal(classes, idx + 1, rel.getInput(0), matchingRels);
+        }
+      } else {
+        if (logger.isDebugEnabled()) {
+          String sequence, matchingSequence;
+          StringBuffer sb = new StringBuffer();
+          for (int i = 0; i < classes.length; i++) {
+            if (i == classes.length - 1) {
+              sb.append(classes[i].getCanonicalName().toString());
+            } else {
+              sb.append(classes[i].getCanonicalName().toString() + "->");
+            }
+          }
+          sequence = sb.toString();
+          sb.delete(0, sb.length());
+          for (int i = 0; i < matchingRels.size(); i++) {
+            if (i == matchingRels.size() - 1) {
+              sb.append(matchingRels.get(i).getClass().getCanonicalName().toString());
+            } else {
+              sb.append(matchingRels.get(i).getClass().getCanonicalName().toString() + "->");
+            }
+          }
+          matchingSequence = sb.toString();
+          logger.debug("FindRelSequence: ABORT: Unexpected Rel={}, After={}, CurSeq={}",
+              rel.getClass().getCanonicalName().toString(), matchingSequence, sequence);
+        }
+        matchingRels.clear();
+      }
+    }
+
+    /*
+     * Generate the rowkeyjoin call context. This context is useful when generating the transformed
+     * plan nodes. It tries to identify some RelNode sequences e.g. Filter-Project-Scan and generates
+     * the context based on the identified sequence.
+     */
+    private RowKeyJoinCallContext generateContext(RelOptRuleCall call, DrillJoinRel joinRel,
+      RelNode joinChildRel, RowKey rowKeyLoc, int rowKeyPos, boolean swapInputs) {
+      List<RelNode> matchingRels;
+      // Sequence of rels (PFPS, FPS, PS, FS, S) matched for this rule
+      Class[] PFPS = new Class[] {DrillProjectRel.class, DrillFilterRel.class, DrillProjectRel.class, DrillScanRel.class};
+      Class[] FPS = new Class[] {DrillFilterRel.class, DrillProjectRel.class, DrillScanRel.class};
+      Class[] PS = new Class[] {DrillProjectRel.class, DrillScanRel.class};
+      Class[] FS = new Class[] {DrillFilterRel.class, DrillScanRel.class};
+      Class[] S = new Class[] {DrillScanRel.class};
+      logger.debug("GenerateContext(): Primary-key: Side={}, RowTypePos={}, SwapInputs={}",
+          rowKeyLoc.name(), rowKeyPos, swapInputs);
+      matchingRels = findRelSequence(PFPS, joinChildRel);
+      if (matchingRels.size() > 0) {
+        logger.debug("Matched rel sequence : Project->Filter->Project->Scan");
+        return new RowKeyJoinCallContext(call, rowKeyLoc, rowKeyPos, swapInputs, joinRel,
+            (DrillProjectRel) matchingRels.get(0), (DrillFilterRel) matchingRels.get(1),
+            (DrillProjectRel) matchingRels.get(2), (DrillScanRel) matchingRels.get(3));
+      }
+      matchingRels = findRelSequence(FPS, joinChildRel);
+      if (matchingRels.size() > 0) {
+        logger.debug("Matched rel sequence : Filter->Project->Scan");
+        return new RowKeyJoinCallContext(call, rowKeyLoc, rowKeyPos, swapInputs, joinRel,
+            null, (DrillFilterRel) matchingRels.get(0), (DrillProjectRel) matchingRels.get(1),
+            (DrillScanRel) matchingRels.get(2));
+      }
+      matchingRels = findRelSequence(PS, joinChildRel);
+      if (matchingRels.size() > 0) {
+        logger.debug("Matched rel sequence : Project->Scan");
+        return new RowKeyJoinCallContext(call, rowKeyLoc, rowKeyPos, swapInputs, joinRel, null,
+            null, (DrillProjectRel) matchingRels.get(0), (DrillScanRel) matchingRels.get(1));
+      }
+      matchingRels = findRelSequence(FS, joinChildRel);
+      if (matchingRels.size() > 0) {
+        logger.debug("Matched rel sequence : Filter->Scan");
+        return new RowKeyJoinCallContext(call, rowKeyLoc, rowKeyPos, swapInputs, joinRel, null,
+            (DrillFilterRel) matchingRels.get(0), null, (DrillScanRel) matchingRels.get(1));
+      }
+      matchingRels = findRelSequence(S, joinChildRel);
+      if (matchingRels.size() > 0) {
+        logger.debug("Matched rel sequence : Scan");
+        return new RowKeyJoinCallContext(call, rowKeyLoc, rowKeyPos, swapInputs, joinRel, null, null,
+            null, (DrillScanRel) matchingRels.get(0));
+      }
+      logger.debug("Matched rel sequence : None");
+      return new RowKeyJoinCallContext(call, RowKey.NONE, -1, false, null, null, null, null, null);
+    }
+
+    @Override
+    public boolean match(RelOptRuleCall call) {
+      DrillJoinRel joinRel = call.rel(0);
+      //Perform validity checks
+      logger.debug("DrillPushRowKeyJoinToScanRule begin()");
+      return canPushRowKeyJoinToScan(joinRel, call.getPlanner()).left;
+    }
+
+    @Override
+    public RowKeyJoinCallContext onMatch(RelOptRuleCall call) {
+      DrillJoinRel joinRel = call.rel(0);
+      /*
+       * Find which side of the join (left/right) has the primary-key column. Then find which sequence of rels
+       * is present on that side of the join. We will need this sequence to correctly transform the left
+       * side of the join.
+       */
+      Pair<Boolean, Pair<RowKey, Integer>> res = canPushRowKeyJoinToScan(joinRel, call.getPlanner());
+      if (res.left) {
+        if (res.right.left == RowKey.LEFT) {
+          return generateContext(call, joinRel, joinRel.getLeft(), res.right.left, res.right.right, false);
+        } else if (res.right.left == RowKey.RIGHT) {
+          // If the primary-key column is present on the right, swapping of inputs is required. Find out if possible!
+          if (canSwapJoinInputs(joinRel, res.right.left)) {
+            return generateContext(call, joinRel, joinRel.getRight(), res.right.left, res.right.right, true);
+          }
+        } else if (res.right.left == RowKey.BOTH) {
+          // Create row key join without swapping inputs, since either side of the join is eligible.
+          return generateContext(call, joinRel, joinRel.getLeft(), res.right.left, res.right.right, false);
+        }
+      }
+      return new RowKeyJoinCallContext(call, RowKey.NONE, -1, false, null, null, null, null, null);
+    }
+  }
+
+  /* Assumption : Only the non-rowkey side needs to be checked. The row-key side does not have
+   * any blocking operators for the transformation to work
+   */
+  private static boolean canSwapJoinInputs(DrillJoinRel joinRel, RowKey rowKeyLocation) {
+    // We cannot swap the join inputs if the join is a semi-join. We determine it indirectly, by
+    // checking for the presence of a aggregating Aggregate Rel (computes aggregates e.g. sum).
+    if (rowKeyLocation == RowKey.LEFT
+        || rowKeyLocation == RowKey.BOTH) {
+      return canSwapJoinInputsInternal(joinRel.getRight());
+    } else if (rowKeyLocation == RowKey.RIGHT) {
+      // If the rowkey occurs on the right side, don't swap since it can potentially cause
+      // wrong results unless we make additional changes to fix-up column ordinals for the
+      // join condition as well as the parent/ancestors of the Join.
+
+      // return canSwapJoinInputsInternal(joinRel.getLeft());
+      return false;
+    }
+    return false;
+  }
+
+  /* Recurse down to find an aggregate (DrillAggRel). For semi-joins Calcite adds an aggregate
+   * without any agg expressions.
+   */
+  private static boolean canSwapJoinInputsInternal(RelNode rel) {
+    if (rel instanceof DrillAggregateRel &&
+        ((DrillAggregateRel) rel).getAggCallList().size() > 0) {
+      return false;
+    } else if (rel instanceof HepRelVertex) {
+      return canSwapJoinInputsInternal(((HepRelVertex) rel).getCurrentRel());
+    } else if (rel instanceof RelSubset) {
+      if (((RelSubset) rel).getBest() != null) {
+        return canSwapJoinInputsInternal(((RelSubset) rel).getBest());
+      } else {
+        return canSwapJoinInputsInternal(((RelSubset) rel).getOriginal());
+      }
+    } else {
+      for (RelNode child : rel.getInputs()) {
+        if (!canSwapJoinInputsInternal(child)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /*
+   * Returns whether the join condition can be pushed (via rowkeyjoin mechanism). It returns true/false alongwith
+   * whether the rowkey is present on the left/right side of the join and its 0-based index in the projection of that
+   * side.
+   */
+  private static Pair<Boolean, Pair<RowKey, Integer>> canPushRowKeyJoinToScan(DrillJoinRel joinRel, RelOptPlanner planner) {
+    RowKey rowKeyLoc = RowKey.NONE;
+    logger.debug("canPushRowKeyJoinToScan(): Check: Rel={}", joinRel);
+
+    if (joinRel instanceof RowKeyJoinRel) {
+      logger.debug("SKIP: Join is a RowKeyJoin");
+      return Pair.of(false, Pair.of(rowKeyLoc, -1));
+    }
+
+    if (joinRel.getJoinType() != JoinRelType.INNER) {
+      logger.debug("SKIP: JoinType={} - NOT an INNER join", joinRel.getJoinType());
+      return Pair.of(false, Pair.of(rowKeyLoc, -1));
+    }
+
+    // Single column equality condition
+    if (joinRel.getCondition().getKind() != SqlKind.EQUALS
+        || joinRel.getLeftKeys().size() != 1
+        || joinRel.getRightKeys().size() != 1) {
+      logger.debug("SKIP: #LeftKeys={}, #RightKeys={} - NOT single predicate join condition",
+          joinRel.getLeftKeys().size(), joinRel.getRightKeys().size());
+      return Pair.of(false, Pair.of(rowKeyLoc, -1));
+    }
+
+    // Join condition is of type primary-key = Col
+    boolean hasLeftRowKeyCol = false;
+    boolean hasRightRowKeyCol = false;
+    int leftRowKeyPos = -1;
+    int rightRowKeyPos = -1;
+    if (joinRel.getCondition() instanceof RexCall) {
+      for (RexNode op : ((RexCall) joinRel.getCondition()).getOperands()) {
+        // Only support rowkey column (no expressions involving rowkey column)
+        if (op instanceof RexInputRef) {
+          //Check the left/right sides of the join to find the primary-key column
+          int pos = ((RexInputRef)op).getIndex();
+          if (pos < joinRel.getLeft().getRowType().getFieldList().size()) {
+            if (isRowKeyColumn(((RexInputRef) op).getIndex(), joinRel.getLeft())) {
+              logger.debug("FOUND Primary-key: Side=LEFT, RowType={}", joinRel.getLeft().getRowType());
+              hasLeftRowKeyCol = true;
+              leftRowKeyPos = pos;
+              break;
+            }
+          } else {
+            if (isRowKeyColumn(pos - joinRel.getLeft().getRowType().getFieldList().size(), joinRel.getRight())) {
+              logger.debug("FOUND Primary-key: Side=RIGHT, RowType={}", joinRel.getRight().getRowType());
+              hasRightRowKeyCol = true;
+              rightRowKeyPos = pos;
+              break;
+            }
+          }
+        }
+      }
+    }
+    if (!hasLeftRowKeyCol && !hasRightRowKeyCol) {
+      logger.debug("SKIP: Primary-key = column condition NOT found");
+      return Pair.of(false, Pair.of(rowKeyLoc, -1));
+    }
+    /* Get the scan rel on left/right side of the join (at least one of them should be non-null for us
+     * to proceed). This would be the side with the primary-key column and would be later transformed to restricted
+     * group scan.
+     */
+    RelNode leftScan = getValidJoinInput(joinRel.getLeft());
+    RelNode rightScan = getValidJoinInput(joinRel.getRight());
+
+    if (leftScan == null && rightScan == null) {
+      logger.debug("SKIP: Blocking operators between join and scans");
+      return Pair.of(false, Pair.of(rowKeyLoc, -1));
+    }
+    // Only valid if the side with the primary-key column doesn't not have any blocking operations e.g. aggregates
+    if (leftScan != null && hasLeftRowKeyCol) {
+      rowKeyLoc = RowKey.LEFT;
+    }
+    if (rightScan != null && hasRightRowKeyCol) {
+      if (rowKeyLoc == RowKey.LEFT) {
+        rowKeyLoc = RowKey.BOTH;
+      } else {
+        rowKeyLoc = RowKey.RIGHT;
+      }
+    }
+    // Heuristic : only generate such plans if selectivity less than RKJ conversion selectivity threshold.
+    // Rowkey join plans do random scans, hence are expensive. Since this transformation takes place in
+    // the HEP planner, it is not costed. Hence, the heuristic to potentially prevent an expensive plan!
+    RelMetadataQuery mq = RelMetadataQuery.instance();
+    double ncSel = PrelUtil.getPlannerSettings(planner).getRowKeyJoinConversionSelThreshold();
+    double sel;
+    if (rowKeyLoc == RowKey.NONE) {
+      return Pair.of(false, Pair.of(rowKeyLoc, -1));
+    } else if (rowKeyLoc == RowKey.LEFT) {
+      sel = computeSelectivity(joinRel.getRight().estimateRowCount(mq), leftScan.estimateRowCount(mq));
+      if (sel > ncSel) {
+        logger.debug("SKIP: SEL= {}/{} = {}\\%, THRESHOLD={}\\%",
+            joinRel.getRight().estimateRowCount(mq), leftScan.estimateRowCount(mq), sel*100.0, ncSel*100.0);
+        return Pair.of(false, Pair.of(rowKeyLoc, -1));
+      }
+    } else {
+      sel = computeSelectivity(joinRel.getLeft().estimateRowCount(mq), rightScan.estimateRowCount(mq));
+      if (sel > ncSel) {
+        logger.debug("SKIP: SEL= {}/{} = {}\\%, THRESHOLD={}\\%",
+            joinRel.getLeft().estimateRowCount(mq), rightScan.estimateRowCount(mq), sel*100.0, ncSel*100.0);
+        return Pair.of(false, Pair.of(rowKeyLoc, -1));
+      }
+    }
+    int rowKeyPos = rowKeyLoc == RowKey.RIGHT ? rightRowKeyPos : leftRowKeyPos;
+    logger.info("FOUND Primary-key: Side={}, RowTypePos={}, Sel={}, Threshold={}",
+        rowKeyLoc.name(), rowKeyPos, sel, ncSel);
+    return Pair.of(true, Pair.of(rowKeyLoc, rowKeyPos));
+  }
+
+  /*
+   * Computes the selectivity given the number of rows selected from the total rows
+   */
+  private static double computeSelectivity(double selectRows, double totalRows) {
+    if (totalRows <= 0) {
+      return 1.0;
+    }
+    return Math.min(1.0, Math.max(0.0, selectRows/totalRows));
+  }
+
+  /* Finds the scan rel underlying the given rel. No blocking operators should
+   * be present in between. Currently, the rowkeyjoin operator cannot send rowkeys
+   * across major fragment boundaries. The presence of blocking operators can
+   * lead to creation of a fragment boundary, hence the limitation. Once, we can
+   * send rowkeys across fragment boundaries, we can remove this restriction.
+   */
+  public static RelNode getValidJoinInput(RelNode rel) {
+    if (rel instanceof DrillScanRel) {
+      return rel;
+    } else if (rel instanceof DrillProjectRel
+        || rel instanceof DrillFilterRel
+        || rel instanceof DrillLimitRel) {
+      for (RelNode child : rel.getInputs()) {
+        RelNode tgt = getValidJoinInput(child);
+        if (tgt != null) {
+          return tgt;
+        }
+      }
+    } else if (rel instanceof HepRelVertex) {
+      return getValidJoinInput(((HepRelVertex) rel).getCurrentRel());
+    } else if (rel instanceof RelSubset) {
+      if (((RelSubset) rel).getBest() != null) {
+        return getValidJoinInput(((RelSubset) rel).getBest());
+      } else {
+        return getValidJoinInput(((RelSubset) rel).getOriginal());
+      }
+    }
+    return null;
+  }
+
+  /* Finds whether the given column reference is for the rowkey col(also known as primary-key col).
+   * We need to recurse down the operators looking at their references down to the scan
+   * to figure out whether the reference is a rowkey col. Projections can rearrange the
+   * incoming columns. We also need to handle HepRelVertex/RelSubset while handling the rels.
+   */
+  private static boolean isRowKeyColumn(int index, RelNode rel) {
+    RelNode curRel = rel;
+    int curIndex = index;
+    while (curRel != null && !(curRel instanceof DrillScanRel)) {
+      logger.debug("IsRowKeyColumn: Rel={}, RowTypePos={}, RowType={}", curRel.toString(), curIndex,
+          curRel.getRowType().toString());
+      if (curRel instanceof HepRelVertex) {
+        curRel = ((HepRelVertex) curRel).getCurrentRel();
+      } else if (curRel instanceof RelSubset) {
+        if (((RelSubset) curRel).getBest() != null) {
+          curRel = ((RelSubset) curRel).getBest();
+        } else {
+          curRel = ((RelSubset) curRel).getOriginal();
+        }
+      } else {
+        RelNode child = null;
+        // For multi-input parent rels, found out the 0-based index in the child rel,
+        // before recursing down that child rel.
+        for (RelNode input : curRel.getInputs()) {
+          if (input.getRowType().getFieldList().size() <= curIndex) {
+            curIndex -= input.getRowType().getFieldList().size();
+          } else {
+            child = input;
+            break;
+          }
+        }
+        curRel = child;
+      }
+      // If no exprs present in projection the column index remains the same in the child.
+      // Otherwise, the column index is the `RexInputRef` index.
+      if (curRel != null && curRel instanceof DrillProjectRel) {
+        List<RexNode> childExprs = curRel.getChildExps();
+        if (childExprs != null && childExprs.size() > 0) {
+          if (childExprs.get(curIndex) instanceof RexInputRef) {
+            curIndex = ((RexInputRef) childExprs.get(curIndex)).getIndex();
+          } else {
+            // Currently do not support expressions on rowkey col. So if an expr is present,
+            // return false
+            logger.debug("IsRowKeyColumn: ABORT: Primary-key EXPR$={}", childExprs.get(curIndex).toString());
+            return false;
+          }
+        }
+      }
+    }
+    logger.debug("IsRowKeyColumn:Primary-key Col={} ",
+        curRel != null ? curRel.getRowType().getFieldNames().get(curIndex) : "??");
+    // Get the primary-key col name from the scan and match with the column being referenced.
+    if (curRel != null && curRel instanceof DrillScanRel) {
+      if (((DrillScanRel) curRel).getGroupScan() instanceof DbGroupScan) {
+        DbGroupScan dbGroupScan = (DbGroupScan) ((DrillScanRel) curRel).getGroupScan();
+        String rowKeyName = dbGroupScan.getRowKeyName();
+        DbGroupScan restrictedGroupScan = dbGroupScan.getRestrictedScan(((DrillScanRel)curRel).getColumns());
+        // Also verify this scan supports restricted groupscans(random seeks)
+        if (restrictedGroupScan != null &&
+            curRel.getRowType().getFieldNames().get(curIndex).equalsIgnoreCase(rowKeyName)) {
+          logger.debug("IsRowKeyColumn: FOUND: Rel={}, RowTypePos={}, RowType={}",
+              curRel.toString(), curIndex, curRel.getRowType().toString());
+          return true;
+        }
+      }
+    }
+    logger.debug("IsRowKeyColumn: NOT FOUND");
+    return false;
+  }
+
+  protected void doOnMatch(RowKeyJoinCallContext rkjCallContext) {
+    if (rkjCallContext.getRowKeyLocation() != RowKey.NONE) {
+      doOnMatch(rkjCallContext.getCall(), rkjCallContext.getRowKeyPosition(), rkjCallContext.mustSwapInputs(),
+          rkjCallContext.getJoinRel(), rkjCallContext.getUpperProjectRel(), rkjCallContext.getFilterRel(),
+          rkjCallContext.getLowerProjectRel(), rkjCallContext.getScanRel());
+    }
+  }
+
+  private void doOnMatch(RelOptRuleCall call, int rowKeyPosition, boolean swapInputs, DrillJoinRel joinRel,
+      DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel, DrillScanRel scanRel) {
+    // Swap the inputs, when necessary (i.e. when the primary-key col is on the right-side of the join)
+    logger.debug("Transforming: Swapping of join inputs is required!");
+    RelNode right = swapInputs ? joinRel.getLeft() : joinRel.getRight();
+    // The join condition is primary-key = COL similarly to PK-FK relationship in relational DBs
+    // where primary-key is PK and COL is FK
+    List<Integer> leftJoinKeys = ImmutableList.of(rowKeyPosition);
+    List<Integer> rightJoinKeys = swapInputs ? joinRel.getLeftKeys() : joinRel.getRightKeys();
+    // Create restricted group scan for scanRel and reconstruct the left side of the join.
+    DbGroupScan restrictedGroupScan = ((DbGroupScan)scanRel.getGroupScan()).getRestrictedScan(
+        scanRel.getColumns());
+    RelNode leftRel =  new DrillScanRel(scanRel.getCluster(), scanRel.getTraitSet(), scanRel.getTable(),
+        restrictedGroupScan, scanRel.getRowType(), scanRel.getColumns(), scanRel.partitionFilterPushdown());
+    // Transform the project/filter rels if present
+    if (lowerProjectRel != null) {
+      leftRel = lowerProjectRel.copy(lowerProjectRel.getTraitSet(), ImmutableList.of(leftRel));
+    }
+    if (filterRel != null) {
+      leftRel = filterRel.copy(filterRel.getTraitSet(), leftRel, filterRel.getCondition());
+    }
+    if (upperProjectRel != null) {
+      leftRel = upperProjectRel.copy(upperProjectRel.getTraitSet(), ImmutableList.of(leftRel));
+    }
+    // Create the equi-join condition for the rowkey join
+    RexNode joinCondition =
+        RelOptUtil.createEquiJoinCondition(leftRel, leftJoinKeys,
+            right, rightJoinKeys, joinRel.getCluster().getRexBuilder());
+    logger.debug("Transforming: LeftKeys={}, LeftRowType={}, RightKeys={}, RightRowType={}",
+        leftJoinKeys, leftRel.getRowType(), rightJoinKeys, right.getRowType());
+    RowKeyJoinRel rowKeyJoin = new RowKeyJoinRel(joinRel.getCluster(), joinRel.getTraitSet(), leftRel, right,
+        joinCondition, joinRel.getJoinType());
+    logger.info("Transforming: SUCCESS: Register runtime filter pushdown plan (rowkeyjoin)");
+    call.transformTo(rowKeyJoin);
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java
new file mode 100644
index 0000000..b82e77c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinCallContext.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+/*
+ * Preserves the context to be used for transforming join to rowkey join in order
+ * to enable join filter pushdown.
+ */
+public class RowKeyJoinCallContext {
+
+  public enum RowKey {NONE, LEFT, RIGHT, BOTH};
+  // RelOptRule call
+  private RelOptRuleCall call;
+  // Row key present on which side of the join
+  private RowKey rowKeyLoc;
+  // 0-based index of the row-key column in the join input
+  private int rowKeyPos;
+  // swapping of row-key join inputs necessary
+  private boolean swapInputs;
+  private DrillJoinRel joinRel;
+  // rels on the rowkey side of the join to be transformed
+  private DrillProjectRel upperProjectRel;
+  private DrillFilterRel filterRel;
+  private DrillProjectRel lowerProjectRel;
+  private DrillScanRel scanRel;
+
+  public RowKeyJoinCallContext (RelOptRuleCall call, RowKey rowKeyLoc, int rowKeyPos, boolean swapInputs,
+      DrillJoinRel joinRel, DrillProjectRel upperProjectRel, DrillFilterRel filterRel, DrillProjectRel lowerProjectRel,
+          DrillScanRel scanRel) {
+    this.call = call;
+    this.rowKeyLoc = rowKeyLoc;
+    this.rowKeyPos = rowKeyPos;
+    this.swapInputs = swapInputs;
+    this.joinRel = joinRel;
+    this.upperProjectRel = upperProjectRel;
+    this.filterRel = filterRel;
+    this.lowerProjectRel = lowerProjectRel;
+    this.scanRel = scanRel;
+  }
+
+  public RelOptRuleCall getCall() {
+    return call;
+  }
+
+  public RowKey getRowKeyLocation() {
+    return rowKeyLoc;
+  }
+
+  public int getRowKeyPosition() {
+    return rowKeyPos;
+  }
+
+  public boolean mustSwapInputs() {
+    return swapInputs;
+  }
+
+  public DrillJoinRel getJoinRel() {
+    return joinRel;
+  }
+
+  public DrillProjectRel getUpperProjectRel() {
+    return upperProjectRel;
+  }
+
+  public DrillFilterRel getFilterRel() {
+    return filterRel;
+  }
+
+  public DrillProjectRel getLowerProjectRel() {
+    return lowerProjectRel;
+  }
+
+  public DrillScanRel getScanRel() {
+    return scanRel;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java
new file mode 100644
index 0000000..2f73526
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RowKeyJoinRel.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.exec.planner.torel.ConversionContext;
+
+import java.util.List;
+
+public class RowKeyJoinRel extends DrillJoinRel implements DrillRel {
+
+  public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+                      JoinRelType joinType)  {
+    super(cluster, traits, left, right, condition, joinType);
+  }
+
+  public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+                      JoinRelType joinType, int joinControl)  {
+    super(cluster, traits, left, right, condition, joinType, joinControl);
+  }
+
+  public RowKeyJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+                      JoinRelType joinType, List<Integer> leftKeys, List<Integer> rightKeys) throws InvalidRelException {
+    super(cluster, traits, left, right, condition, joinType, leftKeys, rightKeys);
+  }
+
+  @Override
+  public RowKeyJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType,
+      boolean semiJoinDone) {
+    return new RowKeyJoinRel(getCluster(), traitSet, left, right, condition, joinType);
+  }
+
+  @Override
+  public LogicalOperator implement(DrillImplementor implementor) {
+    return super.implement(implementor);
+  }
+
+  public static RowKeyJoinRel convert(Join join, ConversionContext context) throws InvalidRelException {
+    Pair<RelNode, RelNode> inputs = getJoinInputs(join, context);
+    RexNode rexCondition = getJoinCondition(join, context);
+    RowKeyJoinRel joinRel = new RowKeyJoinRel(context.getCluster(), context.getLogicalTraits(),
+        inputs.left, inputs.right, rexCondition, join.getJoinType());
+    return joinRel;
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 3665401..a589fcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -19,10 +19,16 @@ package org.apache.drill.exec.planner.physical;
 
 import java.util.List;
 
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.physical.base.DbGroupScan;
 import org.apache.drill.exec.physical.impl.join.JoinUtils;
 import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
 import org.apache.drill.exec.planner.common.DrillJoinRelBase;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.common.JoinControl;
 import org.apache.drill.exec.planner.logical.DrillJoin;
+import org.apache.drill.exec.planner.logical.DrillPushRowKeyJoinToScanRule;
+import org.apache.drill.exec.planner.logical.RowKeyJoinRel;
 import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -78,6 +84,54 @@ public abstract class JoinPruleBase extends Prule {
     return false;
   }
 
+  protected void createRangePartitionRightPlan(RelOptRuleCall call, RowKeyJoinRel join,
+    PhysicalJoinType physicalJoinType, boolean implementAsRowKeyJoin, RelNode left, RelNode right,
+    RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException {
+    assert join.getRightKeys().size() == 1 : "Cannot create range partition plan with multi-column join condition";
+    int joinKeyRight = join.getRightKeys().get(0);
+    List<DrillDistributionTrait.DistributionField> rangeDistFields =
+        Lists.newArrayList(new DrillDistributionTrait.DistributionField(joinKeyRight /* `rowkey equivalent` ordinal on the right side */));
+    List<FieldReference> rangeDistRefList = Lists.newArrayList();
+    FieldReference rangeDistRef =
+        FieldReference.getWithQuotedRef(right.getRowType().getFieldList().get(joinKeyRight).getName());
+    rangeDistRefList.add(rangeDistRef);
+    RelNode leftScan = DrillPushRowKeyJoinToScanRule.getValidJoinInput(left);
+    DrillDistributionTrait rangePartRight = new DrillDistributionTrait(
+        DrillDistributionTrait.DistributionType.RANGE_DISTRIBUTED,
+        ImmutableList.copyOf(rangeDistFields),
+        ((DbGroupScan)((DrillScanRelBase) leftScan).getGroupScan()).getRangePartitionFunction(rangeDistRefList));
+
+    RelTraitSet traitsLeft = null;
+    RelTraitSet traitsRight = null;
+
+    if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+      traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+      traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(rangePartRight);
+    }
+
+    final RelNode convertedLeft = convert(left, traitsLeft);
+    final RelNode convertedRight = convert(right, traitsRight);
+
+    DrillJoinRelBase newJoin = null;
+
+    if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+      if (implementAsRowKeyJoin) {
+        newJoin = new RowKeyJoinPrel(join.getCluster(), traitsLeft,
+            convertedLeft, convertedRight, join.getCondition(),
+            join.getJoinType());
+      } else {
+        newJoin = new HashJoinPrel(join.getCluster(), traitsLeft,
+            convertedLeft, convertedRight, join.getCondition(),
+            join.getJoinType(), false /* no swap */,
+            null /* no runtime filter */,
+            true /* useful for join-restricted scans */, JoinControl.DEFAULT);
+      }
+    }
+    if (newJoin != null) {
+      call.transformTo(newJoin);
+    }
+  }
+
   protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
       PhysicalJoinType physicalJoinType,
       RelNode left, RelNode right,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 7577cf9..c7e8df0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -111,6 +111,13 @@ public class PlannerSettings implements Context{
       new OptionDescription("False disables the DECIMAL data type, including casting to DECIMAL and reading DECIMAL types from Parquet and Hive."));
   public static final OptionValidator HEP_OPT = new BooleanValidator("planner.enable_hep_opt", null);
   public static final OptionValidator HEP_PARTITION_PRUNING = new BooleanValidator("planner.enable_hep_partition_pruning", null);
+  public static final OptionValidator ROWKEYJOIN_CONVERSION = new BooleanValidator("planner.enable_rowkeyjoin_conversion",
+      new OptionDescription("Enables runtime filter pushdown(via rowkey-join) for queries that only filter on rowkeys"));
+  public static final RangeDoubleValidator ROWKEYJOIN_CONVERSION_SELECTIVITY_THRESHOLD =
+      new RangeDoubleValidator("planner.rowkeyjoin_conversion_selectivity_threshold", 0.0, 1.0,
+          new OptionDescription("Sets the selectivity (as a percentage) under which Drill uses a rowkey join for queries that only filter on rowkeys"));
+  public static final OptionValidator ROWKEYJOIN_CONVERSION_USING_HASHJOIN = new BooleanValidator("planner.rowkeyjoin_conversion_using_hashjoin",
+      new OptionDescription("Enables runtime filter pushdown(via hash-join) for queries that only filter on rowkeys"));
   public static final OptionValidator PLANNER_MEMORY_LIMIT = new RangeLongValidator("planner.memory_limit",
       INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, MAX_OFF_HEAP_ALLOCATION_IN_BYTES,
       new OptionDescription("Defines the maximum amount of direct memory allocated to a query for planning. When multiple queries run concurrently, each query is allocated the amount of memory set by this parameter.Increase the value of this parameter and rerun the query if partition pruning failed due to insufficient memory."));
@@ -305,6 +312,12 @@ public class PlannerSettings implements Context{
 
   public boolean isHepPartitionPruningEnabled() { return options.getOption(HEP_PARTITION_PRUNING.getOptionName()).bool_val;}
 
+  public boolean isRowKeyJoinConversionEnabled() { return options.getOption(ROWKEYJOIN_CONVERSION.getOptionName()).bool_val;}
+
+  public boolean isRowKeyJoinConversionUsingHashJoin() { return options.getOption(ROWKEYJOIN_CONVERSION_USING_HASHJOIN.getOptionName()).bool_val;}
+
+  public double getRowKeyJoinConversionSelThreshold() { return options.getOption(ROWKEYJOIN_CONVERSION_SELECTIVITY_THRESHOLD);}
+
   public boolean isHepOptEnabled() { return options.getOption(HEP_OPT.getOptionName()).bool_val;}
 
   public double getHashJoinSwapMarginFactor() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrule.java
new file mode 100644
index 0000000..8ff2dd6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RowKeyJoinPrule.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.drill.exec.planner.logical.RowKeyJoinRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+
+public class RowKeyJoinPrule extends JoinPruleBase {
+
+  public static final RelOptRule INSTANCE = new RowKeyJoinPrule("Prel.RowKeyJoinPrule",
+      RelOptHelper.any(RowKeyJoinRel.class));
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RowKeyJoinPrule.class);
+
+  private RowKeyJoinPrule(String name, RelOptRuleOperand operand) {
+    super(operand, name);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+
+    final RowKeyJoinRel join = call.rel(0);
+    final RelNode left = join.getLeft();
+    final RelNode right = join.getRight();
+
+    if (!checkPreconditions(join, left, right, settings)) {
+      return;
+    }
+
+    try {
+      if (!settings.isRowKeyJoinConversionUsingHashJoin()) {
+        // For now, lets assume rowkey join does not preserve collation
+        createRangePartitionRightPlan(call, join, PhysicalJoinType.HASH_JOIN, true,
+            left, right, null /* left collation */, null /* right collation */);
+      } else {
+        createRangePartitionRightPlan(call, join, PhysicalJoinType.HASH_JOIN, false,
+            left, right, null /* left collation */, null /* right collation */);
+      }
+    } catch (Exception e) {
+      logger.warn(e.toString());
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index f7d11f8..0e374cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -245,6 +245,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
 
       } else {
         final RelNode intermediateNode2;
+        final RelNode intermediateNode3;
         if (context.getPlannerSettings().isHepPartitionPruningEnabled()) {
 
           final RelNode intermediateNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, pruned, logicalTraits);
@@ -266,7 +267,14 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
         }
 
         // Do Join Planning.
-        convertedRelNode = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.JOIN_PLANNING, intermediateNode2);
+        intermediateNode3 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.JOIN_PLANNING, intermediateNode2);
+
+        if (context.getPlannerSettings().isRowKeyJoinConversionEnabled()) {
+          // Covert Join to RowKeyJoin, where applicable.
+          convertedRelNode = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.ROWKEYJOIN_CONVERSION, intermediateNode3);
+        } else {
+          convertedRelNode = intermediateNode3;
+        }
       }
 
       // Convert SUM to $SUM0
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 097b231..2acde8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -107,6 +107,9 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(PlannerSettings.HEP_OPT),
       new OptionDefinition(PlannerSettings.PLANNER_MEMORY_LIMIT),
       new OptionDefinition(PlannerSettings.HEP_PARTITION_PRUNING),
+      new OptionDefinition(PlannerSettings.ROWKEYJOIN_CONVERSION),
+      new OptionDefinition(PlannerSettings.ROWKEYJOIN_CONVERSION_USING_HASHJOIN),
+      new OptionDefinition(PlannerSettings.ROWKEYJOIN_CONVERSION_SELECTIVITY_THRESHOLD),
       new OptionDefinition(PlannerSettings.FILTER_MIN_SELECTIVITY_ESTIMATE_FACTOR),
       new OptionDefinition(PlannerSettings.FILTER_MAX_SELECTIVITY_ESTIMATE_FACTOR),
       new OptionDefinition(PlannerSettings.TYPE_INFERENCE),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4020e96..dfbbbcb 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -522,6 +522,9 @@ drill.exec.options: {
     planner.enable_hashjoin_swap: true,
     planner.enable_hep_opt: true,
     planner.enable_hep_partition_pruning: true,
+    planner.enable_rowkeyjoin_conversion: true,
+    planner.rowkeyjoin_conversion_using_hashjoin: false,
+    planner.rowkeyjoin_conversion_selectivity_threshold: 0.01,
     planner.enable_join_optimization: true,
     planner.enable_limit0_optimization: true,
     planner.enable_limit0_on_scan: true,